/**
 * Copyright (c) 2015-2017, Winter Lau (javayou@gmail.com).
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.crazymaker.l2cache.cluster.level3;

import cn.hutool.core.collection.CollectionUtil;
import com.crazymaker.l2cache.config.J2CacheCoreConfig;
import com.crazymaker.l2cache.manager.Command;
import com.crazymaker.springcloud.common.mpsc.ServiceThread;
import com.crazymaker.springcloud.common.util.JsonUtil;
import com.crazymaker.springcloud.common.util.ThreadUtil;
import com.google.common.collect.Queues;
import okhttp3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class OkHttpL3Policy implements L3Policy {

    private static final Logger LOG = LoggerFactory.getLogger(OkHttpL3Policy.class);
    private static LinkedBlockingQueue<Command> hotKeyCmdQueue = new LinkedBlockingQueue<>();


    private String nginxUrls;

    private static OkHttpClient client = new OkHttpClient();

    NginxSenderService nginxSenderService;

    public OkHttpL3Policy(J2CacheCoreConfig props) {
        this.nginxUrls = props.getNginxUrls();
        nginxSenderService = new NginxSenderService();
        nginxSenderService.start();
    }

    public OkHttpL3Policy( String nginxUrls) {
        this.nginxUrls = nginxUrls;
        nginxSenderService = new NginxSenderService();
        nginxSenderService.start();
    }

    String[] nginxIpPorts;

    /**
     * 删除本地某个缓存条目
     *
     * @param region 区域名称
     * @param keys   缓存键值
     */
    public void evict(String region, String... keys) {

    }

    /**
     * 清除本地整个缓存区域
     *
     * @param region 区域名称
     */
    public void clear(String region) {

    }


    @Override
    public void hotCache(Command cmd) {
        hotKeyCmdQueue.add(cmd);
        nginxSenderService.wakeup();


    }


    class NginxSenderService extends ServiceThread {

        @Override
        public void run() {
            LOG.debug(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                this.waitForRunning(10000);

                //业务代码
                List<Command> tempList = new ArrayList<>();
                //每10ms推送一次
                try {
                    Queues.drain(hotKeyCmdQueue, tempList, 10, 10, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (CollectionUtil.isEmpty(tempList)) {
                    continue;
                }
                String[] keys = tempList.stream().flatMap(m -> Arrays.stream(m.getKeys())).toArray(String[]::new);

                String json = JsonUtil.pojoToJson(keys);
                String[] urls = getNginxIpPorts();
                httpSend(json, urls);
            }

            LOG.debug(this.getServiceName() + " service end");
        }

        @Override
        public String getServiceName() {
            return "L3Cache Hot";
        }
    }


    private String[] getNginxIpPorts() {
        if (null == nginxIpPorts) {
            nginxIpPorts = nginxUrls.split(",");

        }
        return nginxIpPorts;

    }

    public static final MediaType JSON_MEDIA_TYPE = MediaType.parse("application/json; charset=utf-8");

    private static void httpSend(String json, String[] urls) {
        /**
         * 所有HTTP请求的代理设置，超时，缓存设置等都需要在OkHttpClient中设置。
         * 如果需要更改一个请求的配置，可以使用
         * OkHttpClient.newBuilder()获取一个builder对象，
         * 该builder对象与原来OkHttpClient共享相同的连接池，配置等。
         */
        client = client.newBuilder().build();

        RequestBody requestBody = RequestBody.create(JSON_MEDIA_TYPE, json);

        for (String url : urls) {

            Request request = new Request.Builder()
                    .url(url)
                    .post(requestBody)
                    .build();

            Call call = client.newCall(request);
            call.enqueue(new Callback() {
                @Override
                public void onFailure(Call call, IOException e) {
                    LOG.error("HttpSender  onFailure", e);
                    call.cancel();
                }

                @Override
                public void onResponse(Call call, Response response) throws IOException {

                    LOG.info("HttpSender  ok: "+new String(response.body().bytes()));
                    response.body().close();
                }
            });
        }
    }

    public static void main(String[] args) {
        String nginxUrls="http://192.168.56.121:8080/cache/hot";
        String[] keys={"seckill_sku:1","seckill_sku:2","seckill_sku:3"};

        OkHttpL3Policy policy=new OkHttpL3Policy(nginxUrls);
        while (true) {
            Command command = new Command(Command.OPT_PUT_KEY, null, keys);
            policy.hotCache(command);
            ThreadUtil.sleepMilliSeconds(1000);
        }


    }
}
